Lý thuyết của bài

  • Lý thuyết về Apache Spark và RDD tại đây
  • Lý thuyết về DStream tại đây
  • Lý thuyết về MLib tại đây

Cài đặt Spark

Đã cài đặt trước đó
Pasted image 20250402141931.png

web UI của Spark

để bật Web UI, cần đảm bảo đã bật spark trong sbin

$SPARK_HOME/sbin/start-all.sh

Kiểm tra và cài đặt Maven

Cài đặt bằng lệnh

sudo apt update
sudo apt install maven

Kiểm tra

mvn -version

Nếu cài đặt đúng thì sẽ trả về

Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 1.8.0_442, vendor: Private Build, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: ANSI_X3.4-1968
OS name: "linux", version: "5.15.167.4-microsoft-standard-wsl2", arch: "amd64", family: "unix"

Pasted image 20250401113557.png


Tạo Project bằng Maven

Khởi tạo

mvn archetype:generate -DgroupId=com.example -DartifactId=SparkApp -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
  • Lưu ý: nếu bị lỗi có thể do thiếu quyền ở thư mục gì đó. Hãy thử tạo ở home
    Pasted image 20250401115534.png

sau đó sẽ được thư mục có cấu trúc như sau
Pasted image 20250401115610.png

Config

Thêm vào pom.xml

	<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>
    
    <dependency>
	    <groupId>org.apache.spark</groupId>
	    <artifactId>spark-core_2.12</artifactId>
	    <version>3.5.3</version>
	</dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.3.0</version>
    </dependency>

Pasted image 20250401120127.png

Config java

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>

Full file

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>SparkApp</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>SparkApp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

  <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>
  
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.5.3</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.3.0</version>
    </dependency>

    
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

Word Count

Code và dữ liệu

Tạo input txt và đem vào hdfs

xin chao cac ban xin chao Hadoop Bid Data minh xin tu gioi thieu minh len la Luu Vinh Tuong
package spark.main;

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class WordCount {
    public static void main(String[] args) {
         SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Spark Word Count");

        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
            String inputPath = "hdfs://vinhtuong-master:9000/input/input_1.txt";
            String outputPath = "hdfs://vinhtuong-master:9000/output/result";

            JavaRDD<String> textFile = sc.textFile(inputPath).cache();

            JavaPairRDD<String, Integer> wordCounts = textFile
                    .flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
                    .mapToPair(word -> new Tuple2<>(word.replaceAll("[^a-zA-Z]", "").toLowerCase(), 1))
                    .reduceByKey(Integer::sum);

            wordCounts.coalesce(1).saveAsTextFile(outputPath);

            System.out.println("Word Count completed. Output saved to " + outputPath);
        }
    }
}

Đóng gói

mvn clean package

Pasted image 20250401124029.png

Sau khi đóng gói, chúng ta sẽ được thư mục target như sau:
Pasted image 20250401124209.png

Chạy file

spark-submit --class spark.main.WordCount --master local[*] target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401125946.png

Check output

Pasted image 20250401130011.png
Pasted image 20250401130059.png


Spark SQL

Code

name,age,city
Alice,30,New York
Bob,25,Los Angeles
Charlie,35,Chicago
David,40,Houston
Emma,22,San Francisco
Frank,28,Seattle
Grace,33,Boston
Henry,27,Denver
package spark.main;

import org.apache.spark.sql.*;

public class SparkSQLExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Spark SQL Example")
                .master("local[*]")
                .getOrCreate();

            Dataset<Row> df = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("hdfs://vinhtuong-master:9000/input/people.csv");


        System.out.println("Du lieu ban dau:");
        df.show();

        System.out.println("Schema:");
        df.printSchema();

        System.out.println("Chon name va age:");
        df.select("name", "age").show();

        System.out.println("Loc nhung nguoi tren 25 tuoi:");
        df.filter("age > 25").show();

        System.out.println("Nhom theo so tuoi va dem so nguoi:");
        df.groupBy("age").count().show();

        df.write().mode("overwrite").csv("hdfs://vinhtuong-master:9000/output/people");


        spark.stop();
    }
}

Đóng gói

mvn clean package

Pasted image 20250401133149.png
Pasted image 20250401134218.png

Chạy file

spark-submit --class spark.main.SparkSQLExample --master local[*] target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401142737.png

Check output

  • Output bên hdfs sẽ không có gì. Vì những thao tác mình không lưu lại (check trên code) mà chỉ show ra màn hình
    Pasted image 20250401142806.png

  • Nên sẽ check file thực thi ở mục trên
    Pasted image 20250401143054.png
    Pasted image 20250401143106.png
    Pasted image 20250401143114.png
    Pasted image 20250401143122.png
    Pasted image 20250401143157.png
    Pasted image 20250401143215.png


Phân tích dữ liệu bán lẻ với Spark SQL

Chuẩn bị dữ liệu

Lấy dữ liệu tại đây.
Pasted image 20250401145826.png

Có tổng bao nhiêu giao dịch, sản phẩm và khách hàng khác nhau?

Code

package spark.main;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class part_1 {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder()
				.appName("Part-1")
				.master("local")
				.getOrCreate();
		
		Dataset<Row> data = spark.read()
				.option("inferSchema", true)
				.option("header", true)
				.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
				
		// number of customer distinct
		// except 1 because there are value is null of information customer ID
		long cntCustomers = data.select("CustomerID").distinct().count() - 1; 
		
		// number of product distinct
		long cntProdcts = data.select("StockCode").distinct().count();
		
		// number of invoice distinct
		long cntInvoices = data.select("InvoiceNo").distinct().count();
		
		// print 
		System.out.println("Number of customer distinct: " + cntCustomers); 
		System.out.println("Number of product distinct: " + cntProdcts);
		System.out.println("Number of invoice distinct: " + cntInvoices);
		
	}
}

Đóng gói

mvn clean package

Pasted image 20250401151545.png

Chạy file

spark-submit --class spark.main.part_1 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401151511.png

Check output

  • Vì bài này không lưu vào hdfs mà chỉ hiển ra màn hình nên chỉ xem lúc chạy file
    Pasted image 20250401151337.png

Tỉ lệ khách hàng không có thông tin

Code

package spark.main;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class part_2 {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder()
				.appName("Part-2")
				.master("local")
				.getOrCreate();
		
		Dataset<Row> data = spark.read()
				.option("inferSchema", true)
				.option("header", true)
				.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
				
		// get number of customer (done part 1)
		long cntCustomers = data.select("CustomerID").count();
		
		// get number of customer no information
		long cntCustomersNoInfor = data.select("CustomerID").filter(data.col("CustomerID").isNull()).count();
		
		double ratio = (double) cntCustomersNoInfor / cntCustomers * 100;
		System.out.printf("Ratio no information: %f \n", ratio);
	}
}

Đóng gói

mvn clean package

Pasted image 20250401152408.png

Chạy file

spark-submit --class spark.main.part_2 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401152335.png

Check output

Pasted image 20250401152201.png
Tỉ lệ khoảng 29,93 % khách hàng không có thông tin

Đâu là nước có số lượng đơn hàng (Quantity) nhiều thứ 3?

Code

package spark.main;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class part_3 {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder()
				.appName("Part-2")
				.master("local")
				.getOrCreate();
		
		Dataset<Row> data = spark.read()
				.option("inferSchema", true)
				.option("header", true)
				.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
				
		data.createOrReplaceTempView("data");
		spark.sql("select Country, sum(Quantity) as count from data group by Country order by count desc").show();
	}
}

Đóng gói

mvn clean package

Pasted image 20250401152811.png

Chạy file

spark-submit --class spark.main.part_3 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401152858.png
Pasted image 20250401152908.png
Pasted image 20250401152917.png
Pasted image 20250401152926.png

Check output

Pasted image 20250401152944.png
output sắp xếp từ cao đến thấp => nhiều thứ 3 là EIRE với count = 142637

Từ nào xuất hiện ít nhất trong phần Description?

Code

package spark.main;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.types.StructType;

public class part_4 {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Part-4")
                .master("local")
                .getOrCreate();
        
        Dataset<Row> data = spark.read()
                .option("inferSchema", true)
                .option("header", true)
                .csv("hdfs://vinhtuong-master:9000/input/retails.csv");
                
        data.where("Description is not null").flatMap(new FlatMapFunction<Row, Row>() {
            private static final long serialVersionUID = 1L;
            private int cnt = 0;
            
            @Override
            public Iterator<Row> call(Row r) throws Exception {
                List<String> listItem = Arrays.asList(r.getString(2).split(" "));
                
                List<Row> listItemRow = new ArrayList<Row>();
                for (String item : listItem) {
                    listItemRow.add(RowFactory.create(cnt, item, 1));
                    cnt++;
                }
                
                return listItemRow.iterator();
            }
        }, Encoders.row(new StructType()
                .add("number", "integer")
                .add("word", "string")
                .add("lit", "integer")))
        .createOrReplaceTempView("data");
        
        spark.sql("select word, count(lit) as count from data group by word order by count desc").show();
    }
}

Đóng gói

mvn clean package

Pasted image 20250401154302.png

Chạy file

spark-submit --class spark.main.part_4 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401154525.png
Pasted image 20250401154544.png
Pasted image 20250401154556.png
Pasted image 20250401154604.png

Check output

Pasted image 20250401154633.png

Đổi code thành asc để hiển thị từ thấp đến lớn

spark.sql("select word, count(lit) as count from data group by word order by count asc").show();

Pasted image 20250401154953.png
Những từ trong bảng là top những xuất hiện ít nhất trong phần description

Sản phẩm nào bán được số lượng (Quantity) lớn nhất ở United Kingdom?

Code

package spark.main;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class part_5 {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Part-4")
                .master("local")
                .getOrCreate();
        
        Dataset<Row> data = spark.read()
                .option("inferSchema", true)
                .option("header", true)
                .csv("hdfs://vinhtuong-master:9000/input/retails.csv");
                
        data.filter(data.col("Country").equalTo("United Kingdom")).createOrReplaceTempView("data");
		spark.sql("select Description, sum(Quantity) as count from data group by Description order by count desc").show();
    
        }
}

Đóng gói

mvn clean package

Pasted image 20250401155357.png

Chạy file

spark-submit --class spark.main.part_5 target/SparkApp-1.0-SNAPSHOT.jar

Pasted image 20250401155419.png
Pasted image 20250401155429.png
Pasted image 20250401155439.png
Pasted image 20250401155454.png
Pasted image 20250401155507.png

Check output

Pasted image 20250401155529.png
check tên đầy đủ trong csv là WORLD WAR 2 GLIDERS ASSTD DESIGNS
với số lượng 48326


Spark Streaming - Dsteam (discretized steam)

Phiên bản 1: Xử lý theo từng batch

Lý thuyết

Dùng Spark để lấy dữ liệu streaming từ TCP socket (cổng được set là 9999)
Trên máy sẽ mở cổng 9999 để gõ vào

Mở cổng

Note: mở 1 terminal khác để chạy. Xem cái này như 1 cái server

nc -lk 9999

sau đó gõ vào những câu để count, với mỗi câu nhập vào được xem là 1 batch
Pasted image 20250402090121.png

Để kiểm tra xem đã mở được chưa

netstat -an | grep 9999

Nếu mở được sẽ có trạng thái là LISTEN

tcp        0      0 0.0.0.0:9999            0.0.0.0:*               LISTEN 

Pasted image 20250402090352.png

Code

package spark.main;

import org.apache.spark.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Iterator;


public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        // Set up Spark configuration and streaming context
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));

        // Create a DStream that connects to a socket
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
        System.out.println("Dang nhan du lieu tu socket...");
        
        lines.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                System.out.println("Du lieu nhan duoc: ");
                rdd.collect().forEach(System.out::println);
            }
        });

        // Process each RDD from the DStream
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
                                                           .reduceByKey((a, b) -> a + b);

        // Print the counts to the console
        wordCounts.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                System.out.println("Word Count:");
                rdd.collect().forEach(System.out::println);
            }
        });

        // Start the streaming computation
        jssc.start();
        System.out.println("Streaming Started");
        jssc.awaitTermination();

    }
}

Đóng gói

mvn clean package

Chạy file

spark-submit --class spark.main.StreamingWordCount target/SparkApp-1.0-SNAPSHOT.jar
  • Lúc này chương trình sẽ chạy liên tục, cập nhật theo thời gian mình đã config ở dòng
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2)); //2 giây
  • Quay lại với terminal mở port 9999, gõ từng dòng vào

Kiểm tra output

  • Kiểm tra chương trình có chạy hay chưa, tìm "Dang nhan du lieu tu socket..."
    Pasted image 20250402091021.png
  • Kiểm tra streaming đang chạy, tìm "Streaming Started"
    Pasted image 20250402091513.png

Sau Khi kiểm tra có 2 dòng này thì chương trình đã chạy bình thường. Tiếp theo sẽ kiểm tra có nhận được dữ liệu từ cổng 9999 hay không và có word count được hay không

  • Kiểm tra có nhận được dữ liệu từ cổng hay không, tìm "Du lieu nhan duoc:"
    Pasted image 20250402091916.png
  • Kiểm tra word count được không, tìm "Word Count:"
    Pasted image 20250402092013.png

Phiên bản 2: Xử lý theo batch, tính dồn lại

Lý thuyết

Ý tưởng của phiên bản 1, nhưng kiểm tra có những từ gì xuất hiện trong toàn bộ quá trình streaming
Cập nhật thêm:

  • Dùng updateStateByKey() để giữ trạng thái cộng dồn số lần xuất hiện của từng từ.
  • Thêm checkpoint bằng jssc.checkpoint("checkpoint"); để Spark có thể duy trì trạng thái lâu dài.
  • Hàm updateFunction giúp Spark cập nhật tổng số lần xuất hiện từ trước đến hiện tại.

Code

package spark.main;

import org.apache.spark.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Iterator;
import java.util.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        // Set up Spark configuration and streaming context
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));

        jssc.checkpoint("checkpoint");

        // Create a DStream that connects to a socket
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
        System.out.println("Dang nhan du lieu tu socket...");
        
        lines.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                System.out.println("Du lieu nhan duoc: ");
                rdd.collect().forEach(System.out::println);
            }
        });

        // Process each RDD from the DStream
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                                 .map(word -> word.toLowerCase());

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
                                                           .reduceByKey(Integer::sum);

        Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (newValues, state) -> {
            int sum = state.orElse(0);
            for (int val : newValues) {
                sum += val;
            }
            return Optional.of(sum);
        };

        JavaPairDStream<String, Integer> cumulativeWordCounts = wordCounts.updateStateByKey(updateFunction);


        // Print the counts to the console
        cumulativeWordCounts.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                System.out.println("Word Count:");
                rdd.collect().forEach(System.out::println);
            }
        });

        // Start the streaming computation
        jssc.start();
        System.out.println("Streaming Started");
        jssc.awaitTermination();
    }
}

Đóng gói

mvn clean package

Chạy file

spark-submit --class spark.main.StreamingWordCount target/SparkApp-1.0-SNAPSHOT.jar
  • Quay lại với terminal mở port 9999, gõ từng dòng vào

Kiểm tra output

  • Dữ liệu nhập vào
    Pasted image 20250402101418.png
  • Kiểm tra word count được không, tìm "Word Count:"
    Pasted image 20250402101435.png

RDD

map

Code

package spark.main;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class Map {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Demo")
				.setMaster("local[2]");
		
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			List<Integer> list = Arrays.asList(10,20,30);
			JavaRDD<Integer> data = sc.parallelize(list);
			
			data = data.map(new Function<Integer, Integer>() {
				private static final long serialVersionUID = 1L;

				@Override
				public Integer call(Integer v1) throws Exception {
					return v1 * 2;
				}
				
			});
			
			data.collect().forEach(v -> System.out.println(v));
		}
		
	}
}

Kết quả

Pasted image 20250402110812.png
10 20 30 * 2 = 20 40 60

filter

Code

package spark.main;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class Filter {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Demo")
				.setMaster("local[2]");
	
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			List<Integer> list = Arrays.asList(10,11,12,13,14,15);
			JavaRDD<Integer> data = sc.parallelize(list);
			
			data = data.filter(new Function<Integer, Boolean>() {
				private static final long serialVersionUID = 1L;

				@Override
				public Boolean call(Integer v1) throws Exception {
					if(v1 % 5 == 0) return true;
					return false;
				}
			});
			
			data.collect().forEach(v -> System.out.println(v));
		}
	}
}

Kết quả

Pasted image 20250402111534.png

groupByKey

Code

package spark.main;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class GroupByKey {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Demo")
				.setMaster("local[2]");
	
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			List<Tuple2<String, Integer>> list = Arrays.asList(
					new Tuple2<String, Integer>("C", 3), 
					new Tuple2<String, Integer>("A", 1), 
					new Tuple2<String, Integer>("B", 4), 
					new Tuple2<String, Integer>("A", 2), 
					new Tuple2<String, Integer>("B", 5));
			
			JavaPairRDD<String, Integer> data = sc.parallelizePairs(list);
			data.groupByKey().collect().forEach(s -> System.out.println(s));
			
		}
	}
}

Kết quả

Pasted image 20250402112033.png

reduceByKey

Code

package spark.main;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;

public class ReduceByKey {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Demo")
				.setMaster("local[2]");
	
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			List<Tuple2<String, Integer>> list = Arrays.asList(
					new Tuple2<String, Integer>("C", 3), 
					new Tuple2<String, Integer>("A", 1), 
					new Tuple2<String, Integer>("B", 4), 
					new Tuple2<String, Integer>("A", 2), 
					new Tuple2<String, Integer>("B", 5));
			
			JavaPairRDD<String, Integer> data = sc.parallelizePairs(list);
			data = data.reduceByKey(new Function2<Integer, Integer, Integer>() {
				private static final long serialVersionUID = 1L;

				@Override
				public Integer call(Integer v1, Integer v2) throws Exception {
					return v1 + v2;
				}
			});
			
			data.collect().forEach(v -> System.out.println(v));
			
		}
	}
}

Kết quả

Pasted image 20250402112527.png


# Project Log Analyzer với Spark Streaming

Cài đặt môi trường

Scala và sbt

  • SBT
sudo apt-get update
sudo apt-get install apt-transport-https curl gnupg -yqq
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import
sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
sudo apt-get update
sudo apt-get install sbt
sbt --version

Pasted image 20250402133340.png

  • Scala đã cài đặt trước đó
    Pasted image 20250402133430.png

Buid dự án

sbt clean package

Note:Khi buid lần đầu cần tải nhiều thứ về nên buid lâu với tải 1 nùi về
Pasted image 20250402133621.png
Pasted image 20250402133649.png

kết quả trả về

để tìm kết quả search "top endpoint"
Pasted image 20250402135436.png
Pasted image 20250402164125.png


Spark MLib -

Chuẩn bị các file

  • Dữ liệu trong bài tải tại đây
  • File lưu cái biến: constants.py
PATH="/input/LDA/"
NUM_TOPICS=3
MAX_INTER=10
OUTPUT_PATH="/output/LDA/"
  • File tiền xử lý dữ liệu preprocessing.py
import nltk
from nltk.corpus import stopwords
import re as re

nltk.data.path.append("/home/hadoopvinhtuong/nltk_data")
def preprocessing(rdd):
    # pre processing data
    reviews = rdd.map(lambda x: x['Content']).filter(lambda x: x is not None)
    StopWords = stopwords.words("english")
    tokens = reviews.map(lambda document: document.strip().lower())
    tokens = tokens.map(lambda document: re.split(" ", document))
    tokens = tokens.map(lambda word: [x for x in word if x.isalpha()])
    tokens = tokens.map(lambda word: [x for x in word if len(x) > 3])
    tokens = tokens.map(lambda word: [x for x in word if x not in StopWords])

    return tokens

-File code chính LDA.py

import os
import shutil

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer

from preprocessing import preprocessing
import constants

# init Spark Context
conf = SparkConf().setAppName("Spark ML").setMaster("local[2]")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# read raw data
data = sqlContext.read.format("csv").options(header='true', inferSchema='true').load(os.path.realpath(constants.PATH))

rdd = data.rdd

# preprocessing data
tokens = preprocessing(rdd)
tokens = tokens.zipWithIndex()

df = sqlContext.createDataFrame(tokens, ["content", "index"])

# vector data
cv = CountVectorizer(inputCol="content", outputCol="features", vocabSize=500, minDF=3.0)
cvModel = cv.fit(df)
vectorizedToken = cvModel.transform(df)

# clustering
lda = LDA(k=constants.NUM_TOPICS, maxIter=constants.MAX_INTER)
model = lda.fit(vectorizedToken)

# get vocab
vocab = cvModel.vocabulary
topics = model.describeTopics()
topicsRdd = topics.rdd

# result
result = model.transform(vectorizedToken)
result.show()

# save model
if(os.path.isdir(constants.OUTPUT_PATH + "/Model_CountVectorizer")):
	shutil.rmtree(constants.OUTPUT_PATH + "/Model_CountVectorizer")
cvModel.save(constants.OUTPUT_PATH + "/Model_CountVectorizer")

if(os.path.isdir(constants.OUTPUT_PATH + "/Model_LDA")):
	shutil.rmtree(constants.OUTPUT_PATH + "/Model_LDA")
model.save(constants.OUTPUT_PATH + "/Model_LDA")

Pasted image 20250402161316.png

Submit code

spark-submit LDA.py

Pasted image 20250402161520.png
Pasted image 20250402161539.png
Pasted image 20250402161553.png

Kết quả đầu ra

Pasted image 20250402160959.png
Pasted image 20250402161008.png

  • Model_CountVectorizer/data
    Pasted image 20250402161827.png

  • Model_CountVectorizer/metadata
    Pasted image 20250402161859.png

  • Model_LDA/data
    Pasted image 20250402162201.png

  • Model_LDA/metadata
    Pasted image 20250402162222.png

=> Đã lưu model vào hdfs

Nâng cấp thành app với Flask

tạo file mới với tên app.py

Code

from pyspark.sql import SparkSession
from flask import Flask, request
from pyspark.ml.clustering import LocalLDAModel
from pyspark.ml.feature import CountVectorizerModel

import constants
from preprocessing import preprocessing

spark = SparkSession.builder \
    .appName("Spark MLlib with Flask") \
    .master("local[2]") \
    .getOrCreate()

# Khởi tạo Flask app
app = Flask(__name__)

@app.route("/api/predict")
def predict():
    document = request.args.get("document")
    
    # Tải mô hình CountVectorizer và LDA đã lưu
    countVectorizerModel = CountVectorizerModel.load(constants.OUTPUT_PATH + "/Model_CountVectorizer")
    ldaModel = LocalLDAModel.load(constants.OUTPUT_PATH + "/Model_LDA")

    # Tạo DataFrame từ văn bản đầu vào
    documentDF = spark.createDataFrame([(document, )], ["Content"])
    rdd = documentDF.rdd
    tokens = preprocessing(rdd)
    tokens = tokens.zipWithIndex()

    df = spark.createDataFrame(tokens, ["content", "index"])
    vectorizedToken = countVectorizerModel.transform(df)

    # Dự đoán chủ đề bằng mô hình LDA
    result = ldaModel.transform(vectorizedToken)
    result = result.select("topicDistribution")
    result.show(truncate=False)

    pred = result.rdd.first()
    return {"predict": find_max_index(pred['topicDistribution'])}

@app.route("/")
def home():
    return "Flask app is running!"


def find_max_index(arr):
    index = 0
    max_val = 0

    for i in range(len(arr)):
        if arr[i] > max_val:
            max_val = arr[i]
            index = i

    return index

if __name__ == "__main__":
    app.run(debug=True)

Submit code

python3 app.py 

Pasted image 20250402184529.png

  • sau khi xuất hiện như vậy thì đã chạy app thành công và sẽ lắng nghe trên port 5000
  • Để tương tác thì đơn giản nhất là dùng postman

Dùng postman để tương tác

http://localhost:5000/api/predict?document=Hôm nay tôi đi Hà Nội để gập các cán bộbộ hello xin chào các anh để các em có thể ra ngoài

Pasted image 20250403072025.png